package com.digiwin.muke;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @Author yanggld
 * @Date 2021/04/26-11:27
 */
public class CustomSinkToMysql {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> source = env.socketTextStream("c1", 9999);
        SingleOutputStreamOperator<Student> stuStream = source.map(new MapFunction<String, Student>() {
            @Override
            public Student map(String value) throws Exception {
                String[] splits = value.split(",");
                Student student = new Student();
                student.setId(Integer.parseInt(splits[0]));
                student.setName(splits[1]);
                student.setAge(Integer.parseInt(splits[2]));
                return student;
            }
        });
        stuStream.addSink(new SinkToMySQL());

        env.execute("CustomSinkToMysql");
    }

}
